Hanging Chain#
In this example, a mode predictive controller (MPC) is used to stabilize a system of weights connected by springs. The rightmost weight is fixed in place at the origin, whereas the velocity of the leftmost weight can be controlled by an actuator. The six weights in the middle move under the influence of gravity and the forces of the springs between them.
The goal of the controller is to stabilize the system (i.e. drive the velocity of all weights to zero) with the rightmost weight at position \((1, 0)\). Additionally, a non-convex cubic constraint on the weights’ position is imposed, shown in green on the figure below.
1## @example mpc/python/hanging-chain/hanging-chain-mpc.py
2# This example shows how to call the alpaqa solver from Python code, applied
3# to a challenging model predictive control problem.
4
5# %% Hanging chain MPC example
6
7import casadi as cs
8import numpy as np
9import os
10from os.path import join, dirname
11import sys
12
13sys.path.append(dirname(__file__))
14from hanging_chain_dynamics import HangingChain
15
16# %% Build the model
17
18Ts = 0.05
19N = 6 # number of balls
20dim = 2 # dimension
21
22model = HangingChain(N, dim)
23f_d = model.dynamics(Ts)
24y_null, u_null = model.initial_state()
25
26param = [0.03, 1.6, 0.033 / N] # Concrete parameters m, D, L
27
28# %% Apply an initial input to disturb the system
29
30N_dist = 3
31u_dist = [-0.5, 0.5, 0.5] if dim == 3 else [-0.5, 0.5]
32y_dist = model.simulate(N_dist, y_null, u_dist, param)
33y_dist = np.hstack((np.array([y_null]).T, y_dist))
34
35# %% Simulate the system without a controller
36
37N_sim = 180
38y_sim = model.simulate(N_sim, y_dist[:, -1], u_null, param)
39y_sim = np.hstack((y_dist, y_sim))
40
41# %% Define MPC cost and constraints
42
43N_horiz = 12
44
45L_cost = model.generate_cost_fun() # stage cost
46y_init = cs.SX.sym("y_init", *y_null.shape) # initial state
47U = cs.SX.sym("U", dim * N_horiz) # control signals over horizon
48constr_param = cs.SX.sym("c", 3) # Coefficients of cubic constraint function
49mpc_param = cs.vertcat(y_init, model.params, constr_param) # all parameters
50U_mat = model.input_to_matrix(U) # Input as dim by N_horiz matrix
51
52# Cost
53mpc_sim = model.simulate(N_horiz, y_init, U_mat, model.params)
54mpc_cost = 0
55for n in range(N_horiz): # Apply the stage cost function to each stage
56 y_n = mpc_sim[:, n]
57 u_n = U_mat[:, n]
58 mpc_cost += L_cost(y_n, u_n)
59mpc_cost_fun = cs.Function('f_mpc', [U, mpc_param], [mpc_cost])
60
61# Constraints
62g_constr = lambda c, x: c[0] * x**3 + c[1] * x**2 + c[2] * x # Cubic constr
63constr = []
64for n in range(N_horiz): # For each stage,
65 y_n = mpc_sim[:, n]
66 for i in range(N): # for each ball in the stage,
67 yx_n = y_n[dim * i] # constrain the x, y position of the ball
68 yy_n = y_n[dim * i + dim - 1]
69 constr += [yy_n - g_constr(constr_param, yx_n)]
70 constr += [y_n[-1] - g_constr(constr_param, y_n[-dim])] # Ball N+1
71mpc_constr_fun = cs.Function("g", [U, mpc_param], [cs.vertcat(*constr)])
72
73# Fill in the constraint coefficients c(x-a)³ + d(x - a) + b
74a, b, c, d = 0.6, -1.4, 5, 2.2
75constr_coeff = [c, -3 * a * c, 3 * a * a * c + d]
76constr_lb = b - c * a**3 - d * a
77
78# %% NLP formulation
79import alpaqa as pa
80
81prob = pa.generate_and_compile_casadi_problem(mpc_cost_fun, mpc_constr_fun)
82prob.C.lowerbound = -1 * np.ones((dim * N_horiz, ))
83prob.C.upperbound = +1 * np.ones((dim * N_horiz, ))
84prob.D.lowerbound = constr_lb * np.ones((len(constr), ))
85
86# %% NLP solver
87from datetime import timedelta
88
89solver = pa.ALMSolver(
90 alm_params={
91 'ε': 1e-4,
92 'δ': 1e-4,
93 'Σ_0': 1e5,
94 'max_time': timedelta(seconds=0.5),
95 },
96 inner_solver=pa.StructuredPANOCLBFGSSolver(
97 panoc_params={
98 'stop_crit': pa.ProjGradNorm2,
99 'max_time': timedelta(seconds=0.2),
100 'hessian_step_size_heuristic': 15,
101 },
102 lbfgs_params={'memory': N_horiz},
103 ),
104)
105
106# %% MPC controller
107
108
109class MPCController:
110 tot_time = timedelta()
111 tot_it = 0
112 failures = 0
113 U = np.zeros((N_horiz * dim, ))
114 λ = np.zeros(((N + 1) * N_horiz, ))
115
116 def __init__(self, model, problem):
117 self.model = model
118 self.problem = problem
119
120 def __call__(self, y_n):
121 y_n = np.array(y_n).ravel()
122 # Set the current state as the initial state
123 self.problem.param[:y_n.shape[0]] = y_n
124 # Solve the optimal control problem
125 # (warm start using the previous solution and Lagrange multipliers)
126 self.U, self.λ, stats = solver(self.problem, self.U, self.λ)
127 # Print some solver statistics
128 print(stats['status'], stats['outer_iterations'],
129 stats['inner']['iterations'], stats['elapsed_time'],
130 stats['inner_convergence_failures'])
131 self.tot_time += stats['elapsed_time']
132 self.tot_it += stats['inner']['iterations']
133 self.failures += stats['status'] != pa.SolverStatus.Converged
134 # Print the Lagrange multipliers, shows that constraints are active
135 print(np.linalg.norm(self.λ))
136 # Return the optimal control signal for the first time step
137 return self.model.input_to_matrix(self.U)[:, 0]
138
139
140# %% Simulate the system using the MPC controller
141
142y_n = np.array(y_dist[:, -1]).ravel() # initial state for controller
143n_state = y_n.shape[0]
144prob.param = np.concatenate((y_n, param, constr_coeff))
145
146y_mpc = np.empty((n_state, N_sim))
147controller = MPCController(model, prob)
148for n in range(N_sim):
149 # Solve the optimal control problem
150 u_n = controller(y_n)
151 # Apply the first optimal control signal to the system and simulate for
152 # one time step, then update the state
153 y_n = model.simulate(1, y_n, u_n, param).T
154 y_mpc[:, n] = y_n
155y_mpc = np.hstack((y_dist, y_mpc))
156
157print(controller.tot_it, controller.failures, controller.tot_time)
158print(prob.evaluations)
159print(f'{"g":13}:{prob.evaluations.g:6}: {prob.evaluations.time.g/prob.evaluations.g}')
160print(f'{"grad_L":13}:{prob.evaluations.grad_L:6}: {prob.evaluations.time.grad_L/prob.evaluations.grad_L}')
161print(f'{"ψ":13}:{prob.evaluations.ψ:6}: {prob.evaluations.time.ψ/prob.evaluations.ψ}')
162print(f'{"grad_ψ":13}:{prob.evaluations.grad_ψ:6}: {prob.evaluations.time.grad_ψ/prob.evaluations.grad_ψ}')
163print(f'{"grad_ψ_from_ŷ":13}:{prob.evaluations.grad_ψ_from_ŷ:6}: {prob.evaluations.time.grad_ψ_from_ŷ/prob.evaluations.grad_ψ_from_ŷ}')
164print(f'{"ψ_grad_ψ":13}:{prob.evaluations.ψ_grad_ψ:6}: {prob.evaluations.time.ψ_grad_ψ/prob.evaluations.ψ_grad_ψ}')
165
166# %% Visualize
167
168import matplotlib.pyplot as plt
169import matplotlib as mpl
170from matplotlib import patheffects
171from matplotlib.animation import FuncAnimation
172
173mpl.rcParams['animation.frame_format'] = 'svg'
174
175fig, ax = plt.subplots()
176x, y, z = model.state_to_pos(y_null)
177line, = ax.plot(x, y, '-o', label='Without MPC')
178line_ctrl, = ax.plot(x, y, '-o', label='With MPC')
179plt.legend()
180plt.ylim([-2.5, 1])
181plt.xlim([-0.25, 1.25])
182
183x = np.linspace(-0.25, 1.25, 256)
184y = np.linspace(-2.5, 1, 256)
185X, Y = np.meshgrid(x, y)
186Z = g_constr(constr_coeff, X) + constr_lb - Y
187
188fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)]
189cgc = plt.contour(X, Y, Z, [0], colors='tab:green', linewidths=0.8)
190plt.setp(cgc.collections, path_effects=fx)
191
192
193class Animation:
194 points = []
195
196 def __call__(self, i):
197 x, y, z = model.state_to_pos(y_sim[:, i])
198 for p in self.points:
199 p.remove()
200 self.points = []
201 line.set_xdata(x)
202 line.set_ydata(y)
203 viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
204 if np.sum(viol):
205 self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
206 x, y, z = model.state_to_pos(y_mpc[:, i])
207 line_ctrl.set_xdata(x)
208 line_ctrl.set_ydata(y)
209 viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
210 if np.sum(viol):
211 self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
212 return [line, line_ctrl] + self.points
213
214
215ani = FuncAnimation(fig,
216 Animation(),
217 interval=1000 * Ts,
218 blit=True,
219 repeat=True,
220 frames=1 + N_dist + N_sim)
221
222# Export the animation
223out = join(dirname(__file__), '..', '..', '..', '..', 'sphinx', 'source',
224 'sphinxstatic', 'hanging-chain.html')
225os.makedirs(dirname(out), exist_ok=True)
226with open(out, "w") as f:
227 f.write('<center>')
228 f.write(ani.to_jshtml())
229 f.write('</center>')
230
231# Show the animation
232plt.show()
1from typing import Union
2import casadi as cs
3import numpy as np
4from casadi import vertcat as vc
5
6
7class HangingChain:
8 def __init__(self, N: int, dim: int):
9 self.N = N
10 self.dim = dim
11
12 self.y1 = cs.SX.sym("y1", dim * N, 1) # state: balls 1→N positions
13 self.y2 = cs.SX.sym("y2", dim * N, 1) # state: balls 1→N velocities
14 self.y3 = cs.SX.sym("y3", dim, 1) # state: ball 1+N position
15 self.u = cs.SX.sym("u", dim, 1) # input: ball 1+N velocity
16 self.y = vc(self.y1, self.y2, self.y3) # full state vector
17
18 self.m = cs.SX.sym("m") # mass
19 self.D = cs.SX.sym("D") # spring constant
20 self.L = cs.SX.sym("L") # spring length
21
22 self.params = vc(self.m, self.D, self.L)
23
24 self.g = np.array([0, 0, -9.81] if dim == 3 else [0, -9.81]) # gravity
25 self.x0 = np.zeros((dim, )) # ball 0 position
26 self.x_end = np.eye(1, dim, 0).ravel() # ball N+1 reference position
27
28 def dynamics(self, Ts=0.05):
29 y, y1, y2, y3, u = self.y, self.y1, self.y2, self.y3, self.u
30 dist = lambda xa, xb: cs.norm_2(xa - xb)
31 N, d = self.N, self.dim
32 p = self.params
33
34 # Continuous-time dynamics y' = f(y, u; p)
35
36 f1 = [y2]
37 f2 = []
38 for i in range(N):
39 xi = y1[d * i:d * i + d]
40 xip1 = y1[d * i + d:d * i + d * 2] if i < N - 1 else y3
41 Fiip1 = self.D * (1 - self.L / dist(xip1, xi)) * (xip1 - xi)
42 xim1 = y1[d * i - d:d * i] if i > 0 else self.x0
43 Fim1i = self.D * (1 - self.L / dist(xi, xim1)) * (xi - xim1)
44 fi = (Fiip1 - Fim1i) / self.m + self.g
45 f2 += [fi]
46 f3 = [u]
47
48 f_expr = vc(*f1, *f2, *f3)
49 self.f = cs.Function("f", [y, u, p], [f_expr], ["y", "u", "p"], ["y'"])
50
51 # Discretize dynamics y[k+1] = f_d(y[k], u[k]; p)
52
53 opt = {"tf": Ts, "simplify": True, "number_of_finite_elements": 4}
54 intg = cs.integrator("intg", "rk", {
55 "x": y,
56 "p": vc(u, p),
57 "ode": f_expr
58 }, opt)
59
60 f_d_expr = intg(x0=y, p=vc(u, p))["xf"]
61 self.f_d = cs.Function("f_d", [y, u, p], [f_d_expr], ["y", "u", "p"],
62 ["y+"])
63
64 return self.f_d
65
66 def state_to_pos(self, y):
67 N, d = self.N, self.dim
68 rav = lambda x: np.array(x).ravel()
69 xdim = lambda y, i: np.concatenate(
70 ([0], rav(y[i:d * N:d]), rav(y[-d + i])))
71 if d == 3:
72 return (xdim(y, 0), xdim(y, 1), xdim(y, 2))
73 else:
74 return (xdim(y, 0), xdim(y, 1), np.zeros((N + 1, )))
75
76 def input_to_matrix(self, u):
77 """
78 Reshape the input signal from a vector into a dim × N_horiz matrix (note
79 that CasADi matrices are stored column-wise and NumPy arrays row-wise)
80 """
81 if isinstance(u, np.ndarray):
82 return u.reshape((self.dim, u.shape[0] // self.dim), order='F')
83 else:
84 return u.reshape((self.dim, u.shape[0] // self.dim))
85
86 def simulate(self, N_sim: int, y_0: np.ndarray, u: Union[np.ndarray, list,
87 cs.SX.sym],
88 p: Union[np.ndarray, list, cs.SX.sym]):
89 if isinstance(u, list):
90 u = np.array(u)
91 if isinstance(u, np.ndarray):
92 if u.ndim == 1 or (u.ndim == 2 and u.shape[1] == 1):
93 if u.shape[0] == self.dim:
94 u = np.tile(u, (N_sim, 1)).T
95 return self.f_d.mapaccum(N_sim)(y_0, u, p)
96
97 def initial_state(self):
98 N, d = self.N, self.dim
99 y1_0 = np.zeros((d * N))
100 y1_0[0::d] = np.arange(1, N + 1) / (N + 1)
101 y2_0 = np.zeros((d * N))
102 y3_0 = np.zeros((d, ))
103 y3_0[0] = 1
104
105 y_null = np.concatenate((y1_0, y2_0, y3_0))
106 u_null = np.zeros((d, ))
107
108 return y_null, u_null
109
110 def generate_cost_fun(self, α=25, β=1, γ=0.01):
111 N, d = self.N, self.dim
112 y1t = cs.SX.sym("y1t", d * N, 1)
113 y2t = cs.SX.sym("y2t", d * N, 1)
114 y3t = cs.SX.sym("y3t", d, 1)
115 ut = cs.SX.sym("ut", d, 1)
116 yt = cs.vertcat(y1t, y2t, y3t)
117
118 L_cost = α * cs.sumsqr(y3t - self.x_end) + γ * cs.sumsqr(ut)
119 for i in range(N):
120 xdi = y2t[d * i:d * i + d]
121 L_cost += β * cs.sumsqr(xdi)
122 return cs.Function("L_cost", [yt, ut], [L_cost])